package com.lxl.microservicesimpleprovideruser.controller;

import com.google.common.collect.Maps;
import com.lxl.microservicesimpleprovideruser.bean.User;
import com.lxl.microservicesimpleprovideruser.service.AggregationServcie;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.context.request.async.DeferredResult;
import rx.Observable;
import rx.Observer;

import javax.annotation.Resource;
import java.util.HashMap;

/**
 * @desc: 聚合 controller
 * @date & @author: 2018/11/29 16:52 & lxl
 */
@Controller
public class AggregationController {

    private static final Logger logger = LoggerFactory.getLogger(AggregationController.class);

    @Resource
    private AggregationServcie aggregationServcie;

    /**
     * 根据id获得(聚合)用户
     *
     * @param id 用户id
     * @return DeferredResult
     */
    @GetMapping("/aggregate/{id}")
    public @ResponseBody
    DeferredResult<HashMap<String, User>> aggregate(@PathVariable("id") Long id) {
        Observable<HashMap<String, User>> hashMapObservable = this.aggregateObservable(id);
        return this.toDeferredResult(hashMapObservable);
    }


    /**
     * 合并多个Observable发射出来的数据项, 并根据指定的函数变换它们
     *
     * @param id 用户id
     * @return Observable
     */
    public Observable<HashMap<String, User>> aggregateObservable(long id) {
        // 合并多个Observable发射出来的数据项, 并根据指定的函数变换它们
        return Observable.zip(
                this.aggregationServcie.getUserById(id),
                this.aggregationServcie.getMovieUserByUserId(id),
                (user, movieUser) -> {
                    HashMap<String, User> map = Maps.newHashMap();
                    map.put("user", user);
                    map.put("movieUser", movieUser);
                    return map;
                }
        );
    }

    /**
     * 延时获得合并结果
     *
     * @param details 合并后的数据
     * @return DeferredResult
     */
    public DeferredResult<HashMap<String, User>> toDeferredResult(Observable<HashMap<String, User>> details) {
        DeferredResult<HashMap<String, User>> result = new DeferredResult<>();

        // 订阅
        details.subscribe(new Observer<HashMap<String, User>>() {
            @Override
            public void onCompleted() {
                logger.info("-----------数据聚合完成");
            }

            @Override
            public void onError(Throwable throwable) {
                logger.error("-----------数据聚合时发生错误!", throwable);
            }

            @Override
            public void onNext(HashMap<String, User> stringUserHashMap) {
                result.setResult(stringUserHashMap);
            }
        });

        return result;
    }

}
